package com.nulldev.util.internal.backport.httpclient_rw;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ProxySelector;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLPermission;
import java.security.AccessControlContext;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Function;

import com.nulldev.util.internal.backport.concurrency9.concurrent.CompletableFuture;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.Log;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.Logger;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.MinimalFuture;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.Utils;

/**
 * One request/response exchange (handles 100/101 intermediate response also).
 * depth field used to track number of times a new request is being sent for a
 * given API request. If limit exceeded exception is thrown.
 *
 * Security check is performed here: - uses AccessControlContext captured at API
 * level - checks for appropriate URLPermission for request - if permission
 * allowed, grants equivalent SocketPermission to call - in case of direct HTTP
 * proxy, checks additionally for access to proxy (CONNECT proxying uses its own
 * Exchange, so check done there)
 *
 */
final class Exchange<T> {

	final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);

	final HttpRequestImpl request;
	final HttpClientImpl client;
	volatile ExchangeImpl<T> exchImpl;
	volatile CompletableFuture<? extends ExchangeImpl<T>> exchangeCF;
	volatile CompletableFuture<Void> bodyIgnored;

	// used to record possible cancellation raised before the exchImpl
	// has been established.
	private volatile IOException failed;
	final AccessControlContext acc;
	final MultiExchange<T> multi;
	final Executor parentExecutor;
	boolean upgrading; // to HTTP/2
	final PushGroup<T> pushGroup;
	final String dbgTag;

	// Keeps track of the underlying connection when establishing an HTTP/2
	// exchange so that it can be aborted/timed out mid setup.
	final ConnectionAborter connectionAborter = new ConnectionAborter();

	Exchange(HttpRequestImpl request, MultiExchange<T> multi) {
		this.request = request;
		this.upgrading = false;
		this.client = multi.client();
		this.multi = multi;
		this.acc = multi.acc;
		this.parentExecutor = multi.executor;
		this.pushGroup = multi.pushGroup;
		this.dbgTag = "Exchange";
	}

	/* If different AccessControlContext to be used */
	Exchange(HttpRequestImpl request, MultiExchange<T> multi, AccessControlContext acc) {
		this.request = request;
		this.acc = acc;
		this.upgrading = false;
		this.client = multi.client();
		this.multi = multi;
		this.parentExecutor = multi.executor;
		this.pushGroup = multi.pushGroup;
		this.dbgTag = "Exchange";
	}

	PushGroup<T> getPushGroup() {
		return pushGroup;
	}

	Executor executor() {
		return parentExecutor;
	}

	public HttpRequestImpl request() {
		return request;
	}

	HttpClientImpl client() {
		return client;
	}

	// Keeps track of the underlying connection when establishing an HTTP/2
	// exchange so that it can be aborted/timed out mid setup.
	static final class ConnectionAborter {
		private volatile HttpConnection connection;

		void connection(HttpConnection connection) {
			this.connection = connection;
		}

		void closeConnection() {
			HttpConnection connection = this.connection;
			this.connection = null;
			if (connection != null) {
				try {
					connection.close();
				} catch (Throwable t) {
					// ignore
				}
			}
		}
	}

	public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) {
		// If we received a 407 while establishing the exchange
		// there will be no body to read: bodyIgnored will be true,
		// and exchImpl will be null (if we were trying to establish
		// an HTTP/2 tunnel through an HTTP/1.1 proxy)
		if (bodyIgnored != null)
			return MinimalFuture.completedFuture(null);

		// The connection will not be returned to the pool in the case of WebSocket
		return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor).whenComplete((r, t) -> exchImpl.completed());
	}

	/**
	 * Called after a redirect or similar kind of retry where a body might be sent
	 * but we don't want it. Should send a RESET in h2. For http/1.1 we can consume
	 * small quantity of data, or close the connection in other cases.
	 */
	public CompletableFuture<Void> ignoreBody() {
		if (bodyIgnored != null)
			return bodyIgnored;
		return exchImpl.ignoreBody();
	}

	/**
	 * Called when a new exchange is created to replace this exchange. At this point
	 * it is guaranteed that readBody/readBodyAsync will not be called.
	 */
	public void released() {
		ExchangeImpl<?> impl = exchImpl;
		if (impl != null)
			impl.released();
		// Don't set exchImpl to null here. We need to keep
		// it alive until it's replaced by a Stream in wrapForUpgrade.
		// Setting it to null here might get it GC'ed too early, because
		// the Http1Response is now only weakly referenced by the Selector.
	}

	public void cancel() {
		// cancel can be called concurrently before or at the same time
		// that the exchange impl is being established.
		// In that case we won't be able to propagate the cancellation
		// right away
		if (exchImpl != null) {
			exchImpl.cancel();
		} else {
			// no impl - can't cancel impl yet.
			// call cancel(IOException) instead which takes care
			// of race conditions between impl/cancel.
			cancel(new IOException("Request cancelled"));
		}
	}

	public void cancel(IOException cause) {
		if (debug.on())
			debug.log("cancel exchImpl: %s, with \"%s\"", exchImpl, cause);
		// If the impl is non null, propagate the exception right away.
		// Otherwise record it so that it can be propagated once the
		// exchange impl has been established.
		ExchangeImpl<?> impl = exchImpl;
		if (impl != null) {
			// propagate the exception to the impl
			if (debug.on())
				debug.log("Cancelling exchImpl: %s", exchImpl);
			impl.cancel(cause);
		} else {
			// no impl yet. record the exception
			failed = cause;

			// abort/close the connection if setting up the exchange. This can
			// be important when setting up HTTP/2
			connectionAborter.closeConnection();

			// now call checkCancelled to recheck the impl.
			// if the failed state is set and the impl is not null, reset
			// the failed state and propagate the exception to the impl.
			checkCancelled();
		}
	}

	// This method will raise an exception if one was reported and if
	// it is possible to do so. If the exception can be raised, then
	// the failed state will be reset. Otherwise, the failed state
	// will persist until the exception can be raised and the failed state
	// can be cleared.
	// Takes care of possible race conditions.
	private void checkCancelled() {
		ExchangeImpl<?> impl = null;
		IOException cause = null;
		CompletableFuture<? extends ExchangeImpl<T>> cf = null;
		if (failed != null) {
			synchronized (this) {
				cause = failed;
				impl = exchImpl;
				cf = exchangeCF;
			}
		}
		if (cause == null)
			return;
		if (impl != null) {
			// The exception is raised by propagating it to the impl.
			if (debug.on())
				debug.log("Cancelling exchImpl: %s", impl);
			impl.cancel(cause);
			failed = null;
		} else {
			Log.logTrace("Exchange: request [{0}/timeout={1}ms] no impl is set." + "\n\tCan''t cancel yet with {2}", request.uri(),
					request.timeout().isPresent() ?
					// calling duration.toMillis() can throw an exception.
					// this is just debugging, we don't care if it overflows.
							(request.timeout().get().getSeconds() * 1000 + request.timeout().get().getNano() / 1000000) : -1,
					cause);
			if (cf != null)
				cf.completeExceptionally(cause);
		}
	}

	public void h2Upgrade() {
		upgrading = true;
		request.setH2Upgrade(client.client2());
	}

	synchronized IOException getCancelCause() {
		return failed;
	}

	// get/set the exchange impl, solving race condition issues with
	// potential concurrent calls to cancel() or cancel(IOException)
	private CompletableFuture<? extends ExchangeImpl<T>> establishExchange(HttpConnection connection) {
		if (debug.on()) {
			debug.log("establishing exchange for %s,%n\t proxy=%s", request, request.proxy());
		}
		// check if we have been cancelled first.
		Throwable t = getCancelCause();
		checkCancelled();
		if (t != null) {
			return MinimalFuture.failedFuture(t);
		}

		CompletableFuture<? extends ExchangeImpl<T>> cf, res;
		cf = ExchangeImpl.get(this, connection);
		// We should probably use a VarHandle to get/set exchangeCF
		// instead - as we need CAS semantics.
		synchronized (this) {
			exchangeCF = cf;
		}
		;
		res = cf.whenComplete((r, x) -> {
			synchronized (Exchange.this) {
				if (exchangeCF == cf)
					exchangeCF = null;
			}
		});
		checkCancelled();
		return res.thenCompose((eimpl) -> {
			// recheck for cancelled, in case of race conditions
			exchImpl = eimpl;
			IOException tt = getCancelCause();
			checkCancelled();
			if (tt != null) {
				return MinimalFuture.failedFuture(tt);
			} else {
				// Now we're good to go. Because exchImpl is no longer
				// null cancel() will be able to propagate directly to
				// the impl after this point ( if needed ).
				return MinimalFuture.completedFuture(eimpl);
			}
		});
	}

	// Completed HttpResponse will be null if response succeeded
	// will be a non null responseAsync if expect continue returns an error

	public CompletableFuture<Response> responseAsync() {
		return responseAsyncImpl(null);
	}

	CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) {
		SecurityException e = checkPermissions();
		if (e != null) {
			return MinimalFuture.failedFuture(e);
		} else {
			return responseAsyncImpl0(connection);
		}
	}

	// check whether the headersSentCF was completed exceptionally with
	// ProxyAuthorizationRequired. If so the Response embedded in the
	// exception is returned. Otherwise we proceed.
	private CompletableFuture<Response> checkFor407(ExchangeImpl<T> ex, Throwable t, Function<ExchangeImpl<T>, CompletableFuture<Response>> andThen) {
		t = Utils.getCompletionCause(t);
		if (t instanceof ProxyAuthenticationRequired) {
			if (debug.on())
				debug.log("checkFor407: ProxyAuthenticationRequired: building synthetic response");
			bodyIgnored = MinimalFuture.completedFuture(null);
			Response proxyResponse = ((ProxyAuthenticationRequired) t).proxyResponse;
			HttpConnection c = ex == null ? null : ex.connection();
			Response syntheticResponse = new Response(request, this, proxyResponse.headers, c, proxyResponse.statusCode, proxyResponse.version, true);
			return MinimalFuture.completedFuture(syntheticResponse);
		} else if (t != null) {
			if (debug.on())
				debug.log("checkFor407: no response - %s", (Object) t);
			return MinimalFuture.failedFuture(t);
		} else {
			if (debug.on())
				debug.log("checkFor407: all clear");
			return andThen.apply(ex);
		}
	}

	// After sending the request headers, if no ProxyAuthorizationRequired
	// was raised and the expectContinue flag is on, we need to wait
	// for the 100-Continue response
	private CompletableFuture<Response> expectContinue(ExchangeImpl<T> ex) {
		assert request.expectContinue();
		return ex.getResponseAsync(parentExecutor).thenCompose((Response r1) -> {
			Log.logResponse(r1::toString);
			int rcode = r1.statusCode();
			if (rcode == 100) {
				Log.logTrace("Received 100-Continue: sending body");
				if (debug.on())
					debug.log("Received 100-Continue for %s", r1);
				CompletableFuture<Response> cf = exchImpl.sendBodyAsync().thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
				cf = wrapForUpgrade(cf);
				cf = wrapForLog(cf);
				return cf;
			} else {
				Log.logTrace("Expectation failed: Received {0}", rcode);
				if (debug.on())
					debug.log("Expect-Continue failed (%d) for: %s", rcode, r1);
				if (upgrading && rcode == 101) {
					IOException failed = new IOException("Unable to handle 101 while waiting for 100");
					return MinimalFuture.failedFuture(failed);
				}
				return exchImpl.readBodyAsync(this::ignoreBody, false, parentExecutor).thenApply(v -> r1);
			}
		});
	}

	// After sending the request headers, if no ProxyAuthorizationRequired
	// was raised and the expectContinue flag is off, we can immediately
	// send the request body and proceed.
	private CompletableFuture<Response> sendRequestBody(ExchangeImpl<T> ex) {
		assert !request.expectContinue();
		if (debug.on())
			debug.log("sendRequestBody");
		CompletableFuture<Response> cf = ex.sendBodyAsync().thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
		cf = wrapForUpgrade(cf);
		cf = wrapForLog(cf);
		return cf;
	}

	CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
		Function<ExchangeImpl<T>, CompletableFuture<Response>> after407Check;
		bodyIgnored = null;
		if (request.expectContinue()) {
			request.addSystemHeader("Expect", "100-Continue");
			Log.logTrace("Sending Expect: 100-Continue");
			// wait for 100-Continue before sending body
			after407Check = this::expectContinue;
		} else {
			// send request body and proceed.
			after407Check = this::sendRequestBody;
		}
		// The ProxyAuthorizationRequired can be triggered either by
		// establishExchange (case of HTTP/2 SSL tunneling through HTTP/1.1 proxy
		// or by sendHeaderAsync (case of HTTP/1.1 SSL tunneling through HTTP/1.1 proxy
		// Therefore we handle it with a call to this checkFor407(...) after these
		// two places.
		Function<ExchangeImpl<T>, CompletableFuture<Response>> afterExch407Check = (ex) -> ex.sendHeadersAsync()
				.handle((r, t) -> this.checkFor407(r, t, after407Check)).thenCompose(Function.identity());
		return establishExchange(connection).handle((r, t) -> this.checkFor407(r, t, afterExch407Check)).thenCompose(Function.identity());
	}

	private CompletableFuture<Response> wrapForUpgrade(CompletableFuture<Response> cf) {
		if (upgrading) {
			return cf.thenCompose(r -> checkForUpgradeAsync(r, exchImpl));
		}
		return cf;
	}

	private CompletableFuture<Response> wrapForLog(CompletableFuture<Response> cf) {
		if (Log.requests()) {
			return cf.thenApply(response -> {
				Log.logResponse(response::toString);
				return response;
			});
		}
		return cf;
	}

	HttpResponse.BodySubscriber<T> ignoreBody(HttpResponse.ResponseInfo hdrs) {
		return HttpResponse.BodySubscribers.replacing(null);
	}

	// if this response was received in reply to an upgrade
	// then create the Http2Connection from the HttpConnection
	// initialize it and wait for the real response on a newly created Stream

	private CompletableFuture<Response> checkForUpgradeAsync(Response resp, ExchangeImpl<T> ex) {

		int rcode = resp.statusCode();
		if (upgrading && (rcode == 101)) {
			Http1Exchange<T> e = (Http1Exchange<T>) ex;
			// check for 101 switching protocols
			// 101 responses are not supposed to contain a body.
			// => should we fail if there is one?
			if (debug.on())
				debug.log("Upgrading async %s", e.connection());
			return e.readBodyAsync(this::ignoreBody, false, parentExecutor).thenCompose((T v) -> {// v is null
				debug.log("Ignored body");
				// we pass e::getBuffer to allow the ByteBuffers to accumulate
				// while we build the Http2Connection
				return Http2Connection.createAsync(e.connection(), client.client2(), this, e::drainLeftOverBytes).thenCompose((Http2Connection c) -> {
					boolean cached = c.offerConnection();
					Stream<T> s = c.getStream(1);

					if (s == null) {
						// s can be null if an exception occurred
						// asynchronously while sending the preface.
						Throwable t = c.getRecordedCause();
						IOException ioe;
						if (t != null) {
							if (!cached)
								c.close();
							ioe = new IOException("Can't get stream 1: " + t, t);
						} else {
							ioe = new IOException("Can't get stream 1");
						}
						return MinimalFuture.failedFuture(ioe);
					}
					exchImpl.released();
					Throwable t;
					// There's a race condition window where an external
					// thread (SelectorManager) might complete the
					// exchange in timeout at the same time where we're
					// trying to switch the exchange impl.
					// 'failed' will be reset to null after
					// exchImpl.cancel() has completed, so either we
					// will observe failed != null here, or we will
					// observe e.getCancelCause() != null, or the
					// timeout exception will be routed to 's'.
					// Either way, we need to relay it to s.
					synchronized (this) {
						exchImpl = s;
						t = failed;
					}
					// Check whether the HTTP/1.1 was cancelled.
					if (t == null)
						t = e.getCancelCause();
					// if HTTP/1.1 exchange was timed out, don't
					// try to go further.
					if (t instanceof HttpTimeoutException) {
						s.cancelImpl(t);
						return MinimalFuture.failedFuture(t);
					}
					if (debug.on())
						debug.log("Getting response async %s", s);
					return s.getResponseAsync(null);
				});
			});
		}
		return MinimalFuture.completedFuture(resp);
	}

	private URI getURIForSecurityCheck() {
		URI u;
		String method = request.method();
		InetSocketAddress authority = request.authority();
		URI uri = request.uri();

		// CONNECT should be restricted at API level
		if (method.equalsIgnoreCase("CONNECT")) {
			try {
				u = new URI("socket", null, authority.getHostString(), authority.getPort(), null, null, null);
			} catch (URISyntaxException e) {
				throw new InternalError(e); // shouldn't happen
			}
		} else {
			u = uri;
		}
		return u;
	}

	/**
	 * Returns the security permission required for the given details. If method is
	 * CONNECT, then uri must be of form "scheme://host:port"
	 */
	private static URLPermission permissionForServer(URI uri, String method, Map<String, List<String>> headers) {
		if (method.equals("CONNECT")) {
			return new URLPermission(uri.toString(), "CONNECT");
		} else {
			return Utils.permissionForServer(uri, method, headers.keySet().stream());
		}
	}

	/**
	 * Performs the necessary security permission checks required to retrieve the
	 * response. Returns a security exception representing the denied permission, or
	 * null if all checks pass or there is no security manager.
	 */
	private SecurityException checkPermissions() {
		String method = request.method();
		SecurityManager sm = System.getSecurityManager();
		if (sm == null || method.equals("CONNECT")) {
			// tunneling will have a null acc, which is fine. The proxy
			// permission check will have already been preformed.
			return null;
		}

		HttpHeaders userHeaders = request.getUserHeaders();
		URI u = getURIForSecurityCheck();
		URLPermission p = permissionForServer(u, method, userHeaders.map());

		try {
			assert acc != null;
			sm.checkPermission(p, acc);
		} catch (SecurityException e) {
			return e;
		}
		ProxySelector ps = client.proxySelector();
		if (ps != null) {
			if (!method.equals("CONNECT")) {
				// a non-tunneling HTTP proxy. Need to check access
				URLPermission proxyPerm = Utils.permissionForProxy(request.proxy());
				if (proxyPerm != null) {
					try {
						sm.checkPermission(proxyPerm, acc);
					} catch (SecurityException e) {
						return e;
					}
				}
			}
		}
		return null;
	}

	HttpClient.Version version() {
		return multi.version();
	}

	String dbgString() {
		return dbgTag;
	}
}
